package streaming

import (
	"context"
	"time"

	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
	kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
	"github.com/milvus-io/milvus/internal/util/hookutil"
	"github.com/milvus-io/milvus/internal/util/streamingutil/util"
	"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
	"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
	"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
)

var singleton WALAccesser = nil

// Init initializes the wal accesser with the given etcd client.
// should be called before any other operations.
func Init() {
	c, _ := kvfactory.GetEtcdAndPath()
	// init and select wal name
	util.InitAndSelectWALName()
	// register cipher for cipher message
	if hookutil.IsClusterEncyptionEnabled() {
		message.RegisterCipher(hookutil.GetCipher())
	}
	singleton = newWALAccesser(c)
}

// Release releases the resources of the wal accesser.
func Release() {
	if w, ok := singleton.(*walAccesserImpl); ok && w != nil {
		w.Close()
	}
}

// WAL is the entrance to interact with the milvus write ahead log.
func WAL() WALAccesser {
	return singleton
}

// AppendOption is the option for append operation.
type AppendOption struct {
	BarrierTimeTick uint64 // BarrierTimeTick is the barrier time tick of the message.
	// Must be allocated from tso, otherwise undetermined behavior.
}

type TxnOption struct {
	// VChannel is the target vchannel to write.
	// TODO: support cross-wal txn in future.
	VChannel string

	// Keepalive is the time to keepalive of the transaction.
	// If the txn don't append message in the keepalive time, the txn will be expired.
	// Only make sense when keepalive is greater than 1ms.
	// The default value is 0, which means the keepalive is setted by the wal at streaming node.
	Keepalive time.Duration
}

type ReadOption struct {
	// PChannel is the target pchannel to read, if the pchannel is not set.
	// It will be parsed from setted `VChannel`.
	PChannel string

	// VChannel is the target vchannel to read.
	// It must be set to read message from a vchannel.
	// If VChannel is empty, the PChannel must be set, and all message of pchannel will be read.
	VChannel string

	// DeliverPolicy is the deliver policy of the consumer.
	DeliverPolicy options.DeliverPolicy

	// DeliverFilters is the deliver filters of the consumer.
	DeliverFilters []options.DeliverFilter

	// Handler is the message handler used to handle message after recv from consumer.
	MessageHandler message.Handler
}

// Scanner is the interface for reading records from the wal.
type Scanner interface {
	// Done returns a channel which will be closed when scanner is finished or closed.
	Done() <-chan struct{}

	// Error returns the error of the scanner.
	Error() error

	// Close the scanner, release the underlying resources.
	Close()
}

// ReplicateService is the interface for the replicate service.
type ReplicateService interface {
	// Append appends the message into current cluster.
	Append(ctx context.Context, msg message.ReplicateMutableMessage) (*types.AppendResult, error)

	// UpdateReplicateConfiguration updates the replicate configuration to the milvus cluster.
	UpdateReplicateConfiguration(ctx context.Context, config *commonpb.ReplicateConfiguration) error

	// GetReplicateCheckpoint returns the WAL checkpoint that will be used to create scanner
	// from the correct position, ensuring no duplicate or missing messages.
	GetReplicateCheckpoint(ctx context.Context, channelName string) (*wal.ReplicateCheckpoint, error)
}

// Balancer is the interface for managing the balancer of the wal.
type Balancer interface {
	// ListStreamingNode lists the streaming node.
	ListStreamingNode(ctx context.Context) ([]types.StreamingNodeInfo, error)

	// GetWALDistribution returns the wal distribution of the streaming node.
	GetWALDistribution(ctx context.Context, nodeID int64) (*types.StreamingNodeAssignment, error)

	// GetFrozenNodeIDs returns the frozen node ids.
	GetFrozenNodeIDs(ctx context.Context) ([]int64, error)

	// IsRebalanceSuspended returns whether the rebalance of the wal is suspended.
	IsRebalanceSuspended(ctx context.Context) (bool, error)

	// SuspendRebalance suspends the rebalance of the wal.
	SuspendRebalance(ctx context.Context) error

	// ResumeRebalance resumes the rebalance of the wal.
	ResumeRebalance(ctx context.Context) error

	// FreezeNodeIDs freezes the streaming node.
	// The wal will not be assigned to the frozen nodes and the wal will be removed from the frozen nodes.
	FreezeNodeIDs(ctx context.Context, nodeIDs []int64) error

	// DefreezeNodeIDs defreezes the streaming node.
	DefreezeNodeIDs(ctx context.Context, nodeIDs []int64) error
}

// WALAccesser is the interfaces to interact with the milvus write ahead log.
type WALAccesser interface {
	// ForwardService returns the forward service of the wal.
	ForwardService() ForwardService

	// Replicate returns the replicate service of the wal.
	Replicate() ReplicateService

	// ControlChannel returns the control channel name of the wal.
	// It will return the channel name of the control channel of the wal.
	ControlChannel() string

	// Balancer returns the balancer management of the wal.
	Balancer() Balancer

	// Local returns the local services.
	Local() Local

	// Txn returns a transaction for writing records to one vchannel.
	// It promises the atomicity written of the messages.
	// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
	Txn(ctx context.Context, opts TxnOption) (Txn, error)

	// RawAppend writes a records to the log.
	RawAppend(ctx context.Context, msgs message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error)

	// Broadcast returns a broadcast service of wal.
	// Broadcast support cross-vchannel message broadcast.
	// It promises the atomicity written of the messages and eventual consistency.
	// And the broadcasted message must be acked cat consuming-side, otherwise resource leak on broadcast.
	// Broadcast also support the resource-key to achieve a resource-exclusive acquirsion.
	Broadcast() Broadcast

	// Read returns a scanner for reading records from the wal.
	Read(ctx context.Context, opts ReadOption) Scanner

	// AppendMessages appends messages to the wal.
	// It it a helper utility function to append messages to the wal.
	// If the messages is belong to one vchannel, it will be sent as a transaction.
	// Otherwise, it will be sent as individual messages.
	// !!! This function do not promise the atomicity and deliver order of the messages appending.
	AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses

	// AppendMessagesWithOption appends messages to the wal with the given option.
	// Same with AppendMessages, but with the given option.
	AppendMessagesWithOption(ctx context.Context, opts AppendOption, msgs ...message.MutableMessage) AppendResponses
}

type Local interface {
	// GetLatestMVCCTimestampIfLocal gets the latest mvcc timestamp of the vchannel.
	// If the wal is located at remote, it will return 0, error.
	GetLatestMVCCTimestampIfLocal(ctx context.Context, vchannel string) (uint64, error)

	// GetMetricsIfLocal gets the metrics of the local wal.
	// It will only return the metrics of the local wal but not the remote wal.
	GetMetricsIfLocal(ctx context.Context) (*types.StreamingNodeMetrics, error)
}

// Broadcast is the interface for writing broadcast message into the wal.
type Broadcast interface {
	// Append of Broadcast sends a broadcast message to all target vchannels.
	// Guarantees the atomicity written of the messages and eventual consistency.
	// The resource-key bound at the message will be held as a mutex until the message is broadcasted to all vchannels,
	// so the other append operation with the same resource-key will be searialized with a deterministic order on every vchannel.
	// The Append operation will be blocked until the message is consumed and acknowledged by the flusher at streamingnode.
	Append(ctx context.Context, msg message.BroadcastMutableMessage) (*types.BroadcastAppendResult, error)

	// Ack acknowledges a broadcast message at the specified vchannel.
	// It must be called after the message is comsumed by the unique-consumer.
	// It will only return error when the ctx is canceled.
	Ack(ctx context.Context, msg message.ImmutableMessage) error
}

// Txn is the interface for writing transaction into the wal.
type Txn interface {
	// Append writes a record to the log.
	Append(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) error

	// Commit commits the transaction.
	// Commit and Rollback can be only call once, and not concurrent safe with append operation.
	Commit(ctx context.Context) (*types.AppendResult, error)

	// Rollback rollbacks the transaction.
	// Commit and Rollback can be only call once, and not concurrent safe with append operation.
	// TODO: Manually rollback is make no sense for current single wal txn.
	// It is preserved for future cross-wal txn.
	Rollback(ctx context.Context) error
}
